package drds.plus.executor.transaction.strict_write_with_non_transaction_cross_database_read;


import drds.plus.datanode.api.Connection;
import drds.plus.datanode.api.DatasourceManager;
import drds.plus.executor.ExecuteContext;
import drds.plus.executor.transaction.AbstractTransaction;
import drds.plus.executor.transaction.non_transaction_and_close_connection_when_commit.NonTransactionAndCloseConnectionWhenCommitConnectionHolder;
import drds.plus.executor.transaction.strict.StrictConnectionHolder;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.ExecutePlan;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import java.sql.SQLException;
import java.util.Set;

@Slf4j
public class StrictWriteWithNonTransactionCrossDatabaseReadTransaction extends AbstractTransaction {
    private drds.plus.executor.transaction.ConnectionHolder writeConnectionHolder;
    private drds.plus.executor.transaction.ConnectionHolder readConnectionHolder;
    //
    private drds.plus.executor.transaction.ConnectionHolder connectionHolder;
    /**
     * 当前进行事务的节点
     */
    protected String writeDataNodeId = null;
    protected Connection writeConnection = null;

    public StrictWriteWithNonTransactionCrossDatabaseReadTransaction(ExecuteContext executeContext) {
        super(executeContext);
        writeConnectionHolder = new StrictConnectionHolder();
        readConnectionHolder = new NonTransactionAndCloseConnectionWhenCommitConnectionHolder();
        this.connectionHolder = new StrictWriteWithNonTransactionCrossDatabaseReadConnectionHolder(writeConnectionHolder, readConnectionHolder);
    }

    public Connection getConnection(@NonNull String dataNodeId, DatasourceManager datasourceManager, ReadWrite readWrite) throws SQLException {
        lock.lock();
        try {
            if (readWrite == ReadWrite.read) {
                if (dataNodeId.equals(writeDataNodeId)) {
                    return this.writeConnectionHolder.getConnection(dataNodeId, datasourceManager);
                } else {
                    return readConnectionHolder.getConnection(dataNodeId, datasourceManager);
                }

            } else {
                //firstInNewCursor setLimitValue dataNodeId
                if (writeDataNodeId == null) {
                    writeDataNodeId = dataNodeId;
                }
                if (writeDataNodeId.equalsIgnoreCase(dataNodeId) || ExecutePlan.USE_LAST_DATA_NODE.equals(dataNodeId)) {
                    Connection connection = this.writeConnectionHolder.getConnection(writeDataNodeId, datasourceManager);
                    connection.setAutoCommit(false);
                    this.writeConnection = connection;
                    return connection;
                } else {
                    throw new IllegalArgumentException(writeDataNodeId + "-" + dataNodeId);
                }

            }
        } finally {
            lock.unlock();
        }
    }

    public void commit() throws RuntimeException {
        lock.lock();
        try {
            Set<Connection> connectionSet = this.writeConnectionHolder.getConnectionSet();
            for (Connection connection : connectionSet) {
                try {
                    connection.commit();
                } catch (SQLException e) {
                    throw new RuntimeException(e);
                }
            }
            close();
        } finally {
            lock.unlock();
        }

    }

    public void rollback() throws RuntimeException {
        lock.lock();
        try {
            Set<Connection> connectionSet = this.writeConnectionHolder.getConnectionSet();
            for (Connection connection : connectionSet) {
                try {
                    connection.rollback();
                } catch (SQLException e) {
                    throw new RuntimeException(e);
                }
            }
            close();
        } finally {
            lock.unlock();
        }
    }

    public drds.plus.executor.transaction.ConnectionHolder getConnectionHolder() {
        return this.connectionHolder;
    }

    public void tryClose(String dataNodeId, Connection connection) throws SQLException {
        lock.lock();
        try {
            if (connection == this.writeConnection) {
                this.writeConnectionHolder.tryClose(dataNodeId, connection);
            } else {
                this.readConnectionHolder.tryClose(dataNodeId, connection);
            }
        } finally {
            lock.unlock();
        }

    }

    public void close() {

        if (isClosed()) {
            return;
        }
        lock.lock();
        try {
            super.close();
            this.writeConnectionHolder.closeConnectionSet();
            this.readConnectionHolder.closeConnectionSet();
        } finally {
            lock.unlock();
        }

    }

    public void tryClose() throws SQLException {
        if (isClosed()) {
            return;
        }
        lock.lock();
        try {
            this.readConnectionHolder.closeConnectionSet();
        } finally {
            lock.unlock();
        }

    }

}
